"""
Case Type   : 基础功能
Case Name   : 修改订阅,验证connection连接信息password参数
Create At   : 2022/2/14
Owner       : opentestcase026
Description :
    1、创建用户,赋予所有权限
    2、在两个集群中创建相同字段表
    3、创建发布端(集群A)
    4、创建订阅   expect:创建订阅成功
    5.修改订阅connection连接信息password参数为修改后的明文密码   expect:失败
    6.修改订阅connection连接信息password参数为错误密码   expect:失败
    7.1、修改发布端用户密码   expect:成功
    7.2、修改订阅connection连接信息dbname参数为password参数为加密后的修改密码
     expect:成功
    8、向发布端表中插入数据   expect:成功
    9、查询集群B中表中数据是否同步   expect:数据已同步
Expect      :
    1：成功
    2：成功
    3：成功
    4：成功
    5：失败
    6：失败
    7.1：成功
    7.2：成功
    8：插入数据成功
    9：数据同步成功
History     :
    Modified by opentestcase026 2022/3/15:研发代码变更,修改错误提示
    Modified by opentestcase026 2022/3/15:研发代码变更,不支持port端口创建订阅
    Modified by opentestcase026 2022/4/24:研发代码更改,修改断言信息适配
    Modified by opentestcase012 2022/8/4:增加基础数据同步功能后订阅端有基础数据的同时
    再在订阅端修改表数据,主键冲突,关闭基础数据同步 & 修改step6报错信息
"""
import os
import unittest

from testcase.utils.Common import Common
from testcase.utils.CommonSH import CommonSH
from testcase.utils.Constant import Constant
from testcase.utils.Logger import Logger
from yat.test import Node
from yat.test import macro

Primary_SH = CommonSH('PrimaryDbUser')


@unittest.skipIf(3 != Primary_SH.get_node_num(), '非1+2环境不执行')
class Pubsubclass(unittest.TestCase):
    def setUp(self):
        self.log = Logger()
        self.log.info("-----------this is setup-----------")
        self.log.info(f'-----{os.path.basename(__file__)} start-----')
        self.pri_userdb_pub = Node(node='PrimaryDbUser')
        self.pri_userdb_sub = Node(node='remote1_PrimaryDbUser')
        self.constant = Constant()
        self.commsh_pub = CommonSH('PrimaryDbUser')
        self.commsh_sub = CommonSH('remote1_PrimaryDbUser')
        self.com_pub = Common()
        self.subname1 = "sub_case108_1"
        self.pubname1 = "pub_case108_1"
        self.tb_name1 = "public.t_case108_1"
        self.parent_path_pub = os.path.dirname(macro.DB_INSTANCE_PATH)
        self.parent_path_sub = os.path.dirname(macro.DB_INSTANCE_PATH_REMOTE1)
        self.port = str(int(self.pri_userdb_pub.db_port) + 1)
        self.wal_level = self.com_pub.show_param("wal_level")
        self.user_name = "u_case108_1"
        self.user_param_pub = f'-U {self.pri_userdb_pub.db_user} ' \
            f'-W {self.pri_userdb_pub.db_password}'
        self.user_param_sub = f'-U {self.pri_userdb_sub.db_user} ' \
            f'-W {self.pri_userdb_sub.db_password}'

        cmd = f"cp " \
            f"{os.path.join(macro.DB_INSTANCE_PATH, 'pg_hba.conf')} " \
            f"{os.path.join(self.parent_path_pub, 'pg_hba.conf')};"
        self.log.info(cmd)
        result = self.pri_userdb_pub.sh(cmd).result()
        self.log.info(result)
        cmd = f"cp " \
            f"{os.path.join(macro.DB_INSTANCE_PATH_REMOTE1, 'pg_hba.conf')}" \
            f" {os.path.join(self.parent_path_sub, 'pg_hba.conf')};"
        self.log.info(cmd)
        result = self.pri_userdb_sub.sh(cmd).result()
        self.log.info(result)
        self.key_path = os.path.join(
            self.parent_path_sub, 'app', 'bin', 'subscription.key.cipher')
        self.common_passwd = macro.COMMON_PASSWD + '1'
        self.user_param_u = f'-U {self.user_name} ' \
            f'-W {self.common_passwd}'
        self.user_param_u_new = ''

        text = '--step:预置条件,修改pg_hba expect:成功'
        self.log.info(text)
        self.log.info("#######发布端: ")
        guc_res = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, '',
            f'host    replication  {self.user_name} '
            f'{self.pri_userdb_sub.db_host}/32 sha256')
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败:' + text)
        guc_res = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, '',
            f'host    {self.pri_userdb_pub.db_name}  {self.user_name} '
            f'{self.pri_userdb_sub.db_host}/32 sha256')
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败:' + text)
        result = self.commsh_pub.execute_gsguc(
            'set', self.constant.GSGUC_SUCCESS_MSG, 'wal_level=logical')
        self.assertTrue(result, '执行失败:' + text)
        result = self.commsh_pub.restart_db_cluster(True)
        flg = self.constant.START_SUCCESS_MSG in result or 'Degrade' in result
        self.assertTrue(flg, '执行失败:' + text)
        self.log.info("#######订阅端: ")
        guc_res = self.commsh_sub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, macro.DB_INSTANCE_PATH_REMOTE1,
            f'host    replication  {self.user_name} '
            f'{self.pri_userdb_pub.db_host}/32 sha256',
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败:' + text)
        guc_res = self.commsh_sub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, macro.DB_INSTANCE_PATH_REMOTE1,
            f'host {self.pri_userdb_sub.db_name}  {self.user_name} '
            f'{self.pri_userdb_sub.db_host}/32 sha256',
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败:' + text)

    def test_pubsub(self):
        text = "--step1:创建用户,赋予所有权限 expect:成功--"
        self.log.info(text)
        sql = f"create user {self.user_name} password " \
            f"'{self.common_passwd}';" \
            f"grant all privileges to {self.user_name};"
        result = self.commsh_pub.execut_db_sql(sql,
                                               sql_type=self.user_param_pub)
        self.log.info(result)
        self.assertIn(self.constant.CREATE_ROLE_SUCCESS_MSG, result,
                      '执行失败:' + text)
        result = self.commsh_sub.execut_db_sql(sql,
                                               self.user_param_sub, None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertIn(self.constant.CREATE_ROLE_SUCCESS_MSG, result,
                      '执行失败:' + text)

        text = '--step2:在两个集群中创建相同字段表 expect:成功--'
        self.log.info(text)
        create_sql = f'create table {self.tb_name1}' \
            f'(id int primary key,name text);'
        result = self.commsh_pub.execut_db_sql(
            create_sql, sql_type=self.user_param_u)
        self.log.info(result)
        self.assertIn(self.constant.TABLE_CREATE_SUCCESS,
                      result, '执行失败:' + text)
        result = self.commsh_sub.execut_db_sql(create_sql,
                                               self.user_param_u, None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertIn(self.constant.TABLE_CREATE_SUCCESS,
                      result, '执行失败:' + text)

        text = "--step3:创建发布端(集群A) expect:成功--"
        self.log.info(text)
        sql = f"CREATE PUBLICATION {self.pubname1} for all tables;"
        result = self.commsh_pub.execut_db_sql(sql,
                                               sql_type=self.user_param_u)
        self.log.info(result)
        self.assertIn(self.constant.create_pub_succ_msg, result,
                      '执行失败:' + text)
        self.assertNotIn(self.constant.SQL_WRONG_MSG[1], result,
                         '执行失败:' + text)

        text = "--step4:创建订阅   expect:创建订阅成功--"
        self.log.info(text)
        result = self.commsh_sub.execute_generate(
            self.common_passwd, env_path=macro.DB_ENV_PATH_REMOTE1)
        self.assertIn('', result, '执行失败:' + text)
        sql = f"CREATE SUBSCRIPTION {self.subname1} CONNECTION " \
            f"'host={self.pri_userdb_pub.db_host} " \
            f"port={self.port} " \
            f"user={self.user_name} " \
            f"dbname={self.pri_userdb_pub.db_name} " \
            f"password={self.common_passwd}' " \
            f"PUBLICATION {self.pubname1} with (copy_data=false); "
        result = self.commsh_sub.execut_db_sql(sql, self.user_param_u, None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertNotIn(self.constant.SQL_WRONG_MSG[1],
                         result, '执行失败:' + text)
        self.assertIn(self.constant.create_sub_succ_msg, result,
                      '执行失败:' + text)

        text = "--step5: 修改订阅connection连接信息password参数为修改后的明" \
               "文密码   expect:失败--"
        self.log.info(text)
        cmd = f"rm -rf {self.key_path}"
        self.log.info(cmd)
        result = self.pri_userdb_sub.sh(cmd).result()
        self.log.info(result)
        sql = f"alter SUBSCRIPTION {self.subname1} CONNECTION " \
            f"'host={self.pri_userdb_pub.db_host} " \
            f"port={self.port} " \
            f"user={self.user_name} " \
            f"dbname={self.pri_userdb_pub.db_name} " \
            f"password={self.common_passwd}' ;" \
            f"select * from pg_SUBSCRIPTION;"
        result = self.commsh_sub.execut_db_sql(sql, self.user_param_u, None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertIn('ERROR:  No key file subscription.key.cipher',
                      result, '执行失败:' + text)
        self.assertNotIn(self.common_passwd, result, '执行失败:' + text)

        text = "--step6: 修改订阅connection连接信息password参数为错误密码   " \
               "expect:失败--"
        self.log.info(text)
        result = self.commsh_sub.execute_generate(
            self.common_passwd, env_path=macro.DB_ENV_PATH_REMOTE1)
        self.assertIn('', result, '执行失败:' + text)
        sql = f"select pg_sleep(10);" \
            f"alter SUBSCRIPTION {self.subname1} CONNECTION " \
            f"'host={self.pri_userdb_pub.db_host} " \
            f"port={self.port} " \
            f"user={self.user_name} " \
            f"dbname={self.pri_userdb_pub.db_name} " \
            f"password={self.pri_userdb_pub.db_password}' ;" \
            f"select * from pg_SUBSCRIPTION;"
        result = self.commsh_sub.execut_db_sql(sql, self.user_param_u, None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertIn('ERROR:  Failed to connect to publisher.',
                      result, '执行失败:' + text)
        self.assertNotIn(self.common_passwd, result, '执行失败:' + text)
        self.assertNotIn(self.pri_userdb_pub.db_password,
                         result, '执行失败:' + text)

        text = "--step7.1:修改发布端用户密码   expect:成功--"
        self.log.info(text)
        sql = f"alter user {self.user_name} identified by " \
            f"'{self.common_passwd}_1' replace '{self.common_passwd}';"
        result = self.commsh_pub.execut_db_sql(sql,
                                               sql_type=self.user_param_u)
        self.log.info(result)
        self.assertIn(self.constant.ALTER_ROLE_SUCCESS_MSG,
                      result, '执行失败:' + text)

        text = "--step7.2:修改订阅connection连接信息dbname参数为password参数" \
               "为加密后的修改密码 " \
               " expect:成功--"
        self.log.info(text)
        sql = f"select pg_sleep(10);" \
            f"alter SUBSCRIPTION {self.subname1} CONNECTION " \
            f"'host={self.pri_userdb_pub.db_host} " \
            f"port={self.port} " \
            f"user={self.user_name} " \
            f"dbname={self.pri_userdb_pub.db_name} " \
            f"password={self.common_passwd}_1' ;" \
            f"select * from pg_SUBSCRIPTION;"
        result = self.commsh_sub.execut_db_sql(sql, self.user_param_u, None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertNotIn(self.constant.SQL_WRONG_MSG[1], result,
                         '执行失败:' + text)
        self.assertIn(self.constant.alter_sub_succ_msg,
                      result, '执行失败:' + text)

        text = "--step8:向发布端表中插入数据   expect:成功--"
        self.log.info(text)
        sql_insert = f"select pg_sleep(10);" \
            f"select * from pg_replication_slots ;" \
            f"insert into {self.tb_name1} " \
            f"values(generate_series(1,100),'a_'||generate_series(1,100));"
        self.user_param_u_new = f"-U {self.user_name} " \
            f"-W {self.common_passwd}_1"
        result = self.commsh_pub.execut_db_sql(sql_insert,
                                               sql_type=self.user_param_u_new)
        self.log.info(result)
        self.assertIn(self.constant.INSERT_SUCCESS_MSG, result, '执行失败' +
                      text)

        text = "--step9:查询集群B中表中数据是否同步   expect:数据同步--"
        self.log.info(text)
        sql_select = f"select count(*) from {self.tb_name1}"
        result = self.commsh_sub.execut_db_sql(sql_select,
                                               self.user_param_sub,
                                               None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertIn('100', result, '执行失败' + text)
        self.assertIn('(1 row)', result, '执行失败' + text)

    def tearDown(self):
        self.log.info('------------this is tearDown-------------')
        text = "--环境清理--"
        self.log.info(text)
        sql = f"DROP SUBSCRIPTION if exists {self.subname1};"
        drop_sub_result = self.commsh_sub.execut_db_sql(
            sql, self.user_param_u, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(drop_sub_result)
        sql = f"DROP PUBLICATION if exists {self.pubname1};"
        if '' == self.user_param_u_new:
            drop_pub_result = self.commsh_pub.execut_db_sql(
                sql, sql_type=self.user_param_u)
        else:
            drop_pub_result = self.commsh_pub.execut_db_sql(
                sql, sql_type=self.user_param_u_new)
        self.log.info(drop_pub_result)
        sql = f"DROP table if exists {self.tb_name1};"
        result_sub = self.commsh_sub.execut_db_sql(
            sql, self.user_param_u, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result_sub)
        if '' == self.user_param_u_new:
            result_pub = self.commsh_pub.execut_db_sql(
                sql, sql_type=self.user_param_u)
        else:
            result_pub = self.commsh_pub.execut_db_sql(
                sql, sql_type=self.user_param_u_new)
        self.log.info(result_pub)
        sql = f"drop user {self.user_name} cascade;"
        result_pub1 = self.commsh_pub.execut_db_sql(
            sql, sql_type=self.user_param_pub)
        self.log.info(result_pub1)
        result_sub1 = self.commsh_sub.execut_db_sql(
            sql, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result_sub1)
        cmd = f"mv " \
            f"{os.path.join(self.parent_path_pub, 'pg_hba.conf')} " \
            f"{os.path.join(macro.DB_INSTANCE_PATH, 'pg_hba.conf')};"
        self.log.info(cmd)
        result = self.pri_userdb_pub.sh(cmd).result()
        self.log.info(result)
        cmd = f"mv " \
            f"{os.path.join(self.parent_path_sub, 'pg_hba.conf')} " \
            f"{os.path.join(macro.DB_INSTANCE_PATH_REMOTE1, 'pg_hba.conf')} "
        self.log.info(cmd)
        result = self.pri_userdb_sub.sh(cmd).result()
        self.log.info(result)
        result = self.commsh_pub.execute_gsguc(
            'set', self.constant.GSGUC_SUCCESS_MSG,
            f'wal_level={self.wal_level}')
        self.assertTrue(result, '执行失败:' + text)
        self.commsh_pub.restart_db_cluster(True)
        self.commsh_sub.restart_db_cluster(True, macro.DB_ENV_PATH_REMOTE1)
        self.assertIn(self.constant.drop_pub_succ_msg, drop_pub_result,
                      '执行失败' + text)
        self.assertIn(self.constant.drop_sub_succ_msg, drop_sub_result,
                      '执行失败' + text)
        self.assertIn(self.constant.DROP_TABLE_SUCCESS, result_sub, '执行失败'
                      + text)
        self.assertIn(self.constant.DROP_TABLE_SUCCESS, result_pub, '执行失败'
                      + text)
        self.assertIn(self.constant.DROP_ROLE_SUCCESS_MSG, result_sub1,
                      '执行失败' + text)
        self.assertIn(self.constant.DROP_ROLE_SUCCESS_MSG, result_pub1,
                      '执行失败' + text)
        self.log.info(f'-----{os.path.basename(__file__)} end-----')
